# -*- coding: utf-8 -*- #
"""
Time                2023/04/17 19:32
Author:             mingfeng (SunnyQjm)
Email               mfeng@linux.alibaba.com
File                health.py
Description:
"""
import json
from conf.common import YAML_CONFIG
from lib.line_protocol_parser import InfluxDBLineParser
from fastapi import APIRouter, Request
from app.schemas import CecMessage
from sysom_utils import SysomFramework


router = APIRouter()

cec_producer = SysomFramework.cec_producer()


@router.post("/dispatch")
async def dispatch_to_cec(msg: CecMessage):
    topics = YAML_CONFIG.get_service_config().custom.limit_topic
    if not isinstance(topics, list):
        topics = []
    if msg.topic not in topics:
        return {"code": 1, "err_msg": "Topic not allowed", "data": ""}
    try:
        cec_producer.produce(msg.topic, msg.data)
        cec_producer.flush()
    except Exception as e:
        return {"code": 1, "err_msg": str(e), "data": ""}
    return {"code": 0, "err_msg": "", "data": ""}


@router.post("/line_protocol")
async def dispatch_by_line_protocol(request: Request):
    body_text = (await request.body()).decode()
    measurement, tags, fields = InfluxDBLineParser().parse_line(body_text)
    topics = YAML_CONFIG.get_service_config().custom.limit_topic
    if not isinstance(topics, list):
        topics = []
    if measurement not in topics:
        return {"code": 1, "err_msg": "Topic not allowed", "data": ""}
    data = {**tags, **fields}
    try:
        if "extra" in data and type(data["extra"]) == str:
            data["extra"] = json.loads(data["extra"])
    except Exception as e:
        pass
    try:
        cec_producer.produce(measurement, data)
        cec_producer.flush()
    except Exception as e:
        return {"code": 1, "err_msg": str(e), "data": ""}
    return {"code": 0, "err_msg": "", "data": data}

